%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_clients_SUITE).
-compile(export_all).
-compile(nowarn_export_all).

-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_router.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("proper/include/proper.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").

all() ->
    AllTCs = emqx_common_test_helpers:all(?MODULE),
    [
        {group, persistent_sessions},
        {group, msgs_base64_encoding},
        {group, msgs_plain_encoding}
        | AllTCs -- (persistent_session_testcases() ++ client_msgs_testcases())
    ].

groups() ->
    [
        {persistent_sessions, persistent_session_testcases()},
        {msgs_base64_encoding, client_msgs_testcases()},
        {msgs_plain_encoding, client_msgs_testcases()}
    ].

persistent_session_testcases() ->
    [
        t_persistent_sessions1,
        t_persistent_sessions2,
        t_persistent_sessions3,
        t_persistent_sessions4,
        t_persistent_sessions5,
        t_list_clients_v2
    ].
client_msgs_testcases() ->
    [
        t_inflight_messages,
        t_mqueue_messages
    ].

init_per_suite(Config) ->
    ok = snabbkaffe:start_trace(),
    Apps = emqx_cth_suite:start(
        [
            emqx,
            emqx_conf,
            emqx_management,
            emqx_mgmt_api_test_util:emqx_dashboard()
        ],
        #{work_dir => emqx_cth_suite:work_dir(Config)}
    ),
    {ok, _} = emqx_common_test_http:create_default_app(),
    [{apps, Apps} | Config].

end_per_suite(Config) ->
    Apps = ?config(apps, Config),
    emqx_cth_suite:stop(Apps),
    ok.

init_per_group(persistent_sessions, Config) ->
    AppSpecs = [
        {emqx, "session_persistence.enable = true"},
        emqx_management
    ],
    Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(
        "dashboard.listeners.http { enable = true, bind = 18084 }"
    ),
    Cluster = [
        {emqx_mgmt_api_clients_SUITE1, #{role => core, apps => AppSpecs ++ [Dashboard]}},
        {emqx_mgmt_api_clients_SUITE2, #{role => core, apps => AppSpecs}}
    ],
    Nodes = emqx_cth_cluster:start(
        Cluster,
        #{work_dir => emqx_cth_suite:work_dir(Config)}
    ),
    [{nodes, Nodes} | Config];
init_per_group(msgs_base64_encoding, Config) ->
    [{payload_encoding, base64} | Config];
init_per_group(msgs_plain_encoding, Config) ->
    [{payload_encoding, plain} | Config];
init_per_group(_Group, Config) ->
    Config.

end_per_group(persistent_sessions, Config) ->
    Nodes = ?config(nodes, Config),
    emqx_cth_cluster:stop(Nodes),
    ok;
end_per_group(_Group, _Config) ->
    ok.

init_per_testcase(_TC, Config) ->
    ok = snabbkaffe:start_trace(),
    Config.

end_per_testcase(TC, _Config) when
    TC =:= t_inflight_messages;
    TC =:= t_mqueue_messages
->
    ok = snabbkaffe:stop(),
    ClientId = atom_to_binary(TC),
    lists:foreach(fun(P) -> exit(P, kill) end, emqx_cm:lookup_channels(local, ClientId)),
    ok = emqx_common_test_helpers:wait_for(
        ?FUNCTION_NAME,
        ?LINE,
        fun() -> [] =:= emqx_cm:lookup_channels(local, ClientId) end,
        5000
    ),
    ok = snabbkaffe:stop(),
    ok;
end_per_testcase(_TC, _Config) ->
    ok = snabbkaffe:stop(),
    ok.

t_clients(_) ->
    process_flag(trap_exit, true),

    Username1 = <<"user1">>,
    ClientId1 = <<"client1">>,

    Username2 = <<"user2">>,
    ClientId2 = <<"client2">>,

    Topic = <<"topic_1">>,
    Qos = 0,

    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),

    {ok, C1} = emqtt:start_link(#{
        username => Username1,
        clientid => ClientId1,
        proto_ver => v5,
        properties => #{'Session-Expiry-Interval' => 120}
    }),
    {ok, _} = emqtt:connect(C1),
    {ok, C2} = emqtt:start_link(#{username => Username2, clientid => ClientId2}),
    {ok, _} = emqtt:connect(C2),

    timer:sleep(300),

    %% get /clients
    ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]),
    {ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
    ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]),
    ClientsMeta = maps:get(<<"meta">>, ClientsResponse),
    ClientsPage = maps:get(<<"page">>, ClientsMeta),
    ClientsLimit = maps:get(<<"limit">>, ClientsMeta),
    ClientsCount = maps:get(<<"count">>, ClientsMeta),
    ?assertEqual(ClientsPage, 1),
    ?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()),
    ?assertEqual(ClientsCount, 2),

    %% get /clients/:clientid
    Client1Path = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1)]),
    {ok, Client1} = emqx_mgmt_api_test_util:request_api(get, Client1Path),
    Client1Response = emqx_utils_json:decode(Client1, [return_maps]),
    ?assertEqual(Username1, maps:get(<<"username">>, Client1Response)),
    ?assertEqual(ClientId1, maps:get(<<"clientid">>, Client1Response)),
    ?assertEqual(120, maps:get(<<"expiry_interval">>, Client1Response)),

    %% delete /clients/:clientid kickout
    Client2Path = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId2)]),
    {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client2Path),
    Kick =
        receive
            {'EXIT', C2, _} ->
                ok
        after 300 ->
            timeout
        end,
    ?assertEqual(ok, Kick),
    %% Client info is cleared after DOWN event
    ?retry(_Interval = 100, _Attempts = 5, begin
        AfterKickoutResponse2 = emqx_mgmt_api_test_util:request_api(get, Client2Path),
        ?assertEqual(AfterKickoutResponse2, {error, {"HTTP/1.1", 404, "Not Found"}})
    end),

    %% get /clients/:clientid/authorization/cache should have no authz cache
    Client1AuthzCachePath = emqx_mgmt_api_test_util:api_path([
        "clients",
        binary_to_list(ClientId1),
        "authorization",
        "cache"
    ]),
    {ok, Client1AuthzCache0} = emqx_mgmt_api_test_util:request_api(get, Client1AuthzCachePath),
    ?assertEqual("[]", Client1AuthzCache0),

    %% post /clients/:clientid/subscribe
    SubscribeBody = #{topic => Topic, qos => Qos, nl => 1, rh => 1},
    SubscribePath = emqx_mgmt_api_test_util:api_path([
        "clients",
        binary_to_list(ClientId1),
        "subscribe"
    ]),
    {ok, _} = emqx_mgmt_api_test_util:request_api(
        post,
        SubscribePath,
        "",
        AuthHeader,
        SubscribeBody
    ),
    timer:sleep(100),
    {_, [{AfterSubTopic, #{qos := AfterSubQos}}]} = emqx_mgmt:list_client_subscriptions(ClientId1),
    ?assertEqual(AfterSubTopic, Topic),
    ?assertEqual(AfterSubQos, Qos),

    %% get /clients/:clientid/subscriptions
    SubscriptionsPath = emqx_mgmt_api_test_util:api_path([
        "clients",
        binary_to_list(ClientId1),
        "subscriptions"
    ]),
    {ok, SubscriptionsRes} = emqx_mgmt_api_test_util:request_api(
        get,
        SubscriptionsPath,
        "",
        AuthHeader
    ),
    [SubscriptionsData] = emqx_utils_json:decode(SubscriptionsRes, [return_maps]),
    ?assertMatch(
        #{
            <<"clientid">> := ClientId1,
            <<"nl">> := 1,
            <<"rap">> := 0,
            <<"rh">> := 1,
            <<"node">> := _,
            <<"qos">> := Qos,
            <<"topic">> := Topic
        },
        SubscriptionsData
    ),

    %% post /clients/:clientid/unsubscribe
    UnSubscribePath = emqx_mgmt_api_test_util:api_path([
        "clients",
        binary_to_list(ClientId1),
        "unsubscribe"
    ]),
    UnSubscribeBody = #{topic => Topic},
    {ok, _} = emqx_mgmt_api_test_util:request_api(
        post,
        UnSubscribePath,
        "",
        AuthHeader,
        UnSubscribeBody
    ),
    timer:sleep(100),
    ?assertEqual([], emqx_mgmt:list_client_subscriptions(ClientId1)),

    %% testcase cleanup, kickout client1
    {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client1Path),
    timer:sleep(300),
    AfterKickoutResponse1 = emqx_mgmt_api_test_util:request_api(get, Client1Path),
    ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse1).

t_persistent_sessions1(Config) ->
    [N1, _N2] = ?config(nodes, Config),
    APIPort = 18084,
    Port1 = get_mqtt_port(N1, tcp),

    ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),

    ?check_trace(
        begin
            %% Scenario 1
            %% 1) Client connects and is listed as connected.
            ?tp(notice, "scenario 1", #{}),
            O = #{api_port => APIPort},
            ClientId = <<"c1">>,
            C1 = connect_client(#{port => Port1, clientid => ClientId}),
            assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
            %% 2) Client disconnects and is listed as disconnected.
            ok = emqtt:disconnect(C1),
            assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}),
            %% 3) Client reconnects and is listed as connected.
            C2 = connect_client(#{port => Port1, clientid => ClientId}),
            assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
            %% 4) Client disconnects.
            ok = emqtt:stop(C2),
            %% 5) Session is GC'ed, client is removed from list.
            ?tp(notice, "gc", #{}),
            %% simulate GC
            ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
            ?retry(
                100,
                20,
                ?assertMatch(
                    {ok, {{_, 200, _}, _, #{<<"data">> := []}}},
                    list_request(APIPort)
                )
            ),
            ok
        end,
        []
    ),
    ok.

t_persistent_sessions2(Config) ->
    [N1, _N2] = ?config(nodes, Config),
    APIPort = 18084,
    Port1 = get_mqtt_port(N1, tcp),

    ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),

    ?check_trace(
        begin
            %% Scenario 2
            %% 1) Client connects and is listed as connected.
            ?tp(notice, "scenario 2", #{}),
            O = #{api_port => APIPort},
            ClientId = <<"c2">>,
            C1 = connect_client(#{port => Port1, clientid => ClientId}),
            assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
            unlink(C1),
            %% 2) Client connects to the same node and takes over, listed only once.
            C2 = connect_client(#{port => Port1, clientid => ClientId}),
            assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
            ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}),
            ?retry(
                100,
                20,
                ?assertMatch(
                    {ok, {{_, 200, _}, _, #{<<"data">> := []}}},
                    list_request(APIPort)
                )
            )
        end,
        []
    ),
    ok.

t_persistent_sessions3(Config) ->
    [N1, N2] = ?config(nodes, Config),
    APIPort = 18084,
    Port1 = get_mqtt_port(N1, tcp),
    Port2 = get_mqtt_port(N2, tcp),

    ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),

    ?check_trace(
        begin
            %% Scenario 3
            %% 1) Client connects and is listed as connected.
            ?tp(notice, "scenario 3", #{}),
            O = #{api_port => APIPort},
            ClientId = <<"c3">>,
            C1 = connect_client(#{port => Port1, clientid => ClientId}),
            assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
            unlink(C1),
            %% 2) Client connects to *another node* and takes over, listed only once.
            C2 = connect_client(#{port => Port2, clientid => ClientId}),
            assert_single_client(O#{node => N2, clientid => ClientId, status => connected}),
            %% Doesn't show up in the other node while alive
            ?retry(
                100,
                20,
                ?assertMatch(
                    {ok, {{_, 200, _}, _, #{<<"data">> := []}}},
                    list_request(APIPort, "node=" ++ atom_to_list(N1))
                )
            ),
            ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0})
        end,
        []
    ),
    ok.

t_persistent_sessions4(Config) ->
    [N1, N2] = ?config(nodes, Config),
    APIPort = 18084,
    Port1 = get_mqtt_port(N1, tcp),
    Port2 = get_mqtt_port(N2, tcp),

    ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),

    ?check_trace(
        begin
            %% Scenario 4
            %% 1) Client connects and is listed as connected.
            ?tp(notice, "scenario 4", #{}),
            O = #{api_port => APIPort},
            ClientId = <<"c4">>,
            C1 = connect_client(#{port => Port1, clientid => ClientId}),
            assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
            %% 2) Client disconnects and is listed as disconnected.
            ok = emqtt:stop(C1),
            %% While disconnected, shows up in both nodes.
            assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}),
            assert_single_client(O#{node => N2, clientid => ClientId, status => disconnected}),
            %% 3) Client reconnects to *another node* and is listed as connected once.
            C2 = connect_client(#{port => Port2, clientid => ClientId}),
            assert_single_client(O#{node => N2, clientid => ClientId, status => connected}),
            %% Doesn't show up in the other node while alive
            ?retry(
                100,
                20,
                ?assertMatch(
                    {ok, {{_, 200, _}, _, #{<<"data">> := []}}},
                    list_request(APIPort, "node=" ++ atom_to_list(N1))
                )
            ),
            ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0})
        end,
        []
    ),
    ok.

t_persistent_sessions5(Config) ->
    [N1, N2] = ?config(nodes, Config),
    APIPort = 18084,
    Port1 = get_mqtt_port(N1, tcp),
    Port2 = get_mqtt_port(N2, tcp),

    ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),

    ?check_trace(
        begin
            %% Pagination with mixed clients
            ClientId1 = <<"c5">>,
            ClientId2 = <<"c6">>,
            ClientId3 = <<"c7">>,
            ClientId4 = <<"c8">>,
            %% persistent
            C1 = connect_client(#{port => Port1, clientid => ClientId1}),
            C2 = connect_client(#{port => Port2, clientid => ClientId2}),
            %% in-memory
            C3 = connect_client(#{
                port => Port1, clientid => ClientId3, expiry => 0, clean_start => true
            }),
            C4 = connect_client(#{
                port => Port2, clientid => ClientId4, expiry => 0, clean_start => true
            }),

            P1 = list_request(APIPort, "limit=3&page=1"),
            P2 = list_request(APIPort, "limit=3&page=2"),
            ?assertMatch(
                {ok,
                    {{_, 200, _}, _, #{
                        <<"data">> := [_, _, _],
                        <<"meta">> := #{
                            %% TODO: if/when we fix the persistent session count, this
                            %% should be 4.
                            <<"count">> := 6,
                            <<"hasnext">> := true
                        }
                    }}},
                P1
            ),
            ?assertMatch(
                {ok,
                    {{_, 200, _}, _, #{
                        <<"data">> := [_],
                        <<"meta">> := #{
                            %% TODO: if/when we fix the persistent session count, this
                            %% should be 4.
                            <<"count">> := 6,
                            <<"hasnext">> := false
                        }
                    }}},
                P2
            ),
            {ok, {_, _, #{<<"data">> := R1}}} = P1,
            {ok, {_, _, #{<<"data">> := R2}}} = P2,
            ?assertEqual(
                lists:sort([ClientId1, ClientId2, ClientId3, ClientId4]),
                lists:sort(lists:map(fun(#{<<"clientid">> := CId}) -> CId end, R1 ++ R2))
            ),
            ?assertMatch(
                {ok,
                    {{_, 200, _}, _, #{
                        <<"data">> := [_, _],
                        <<"meta">> := #{
                            %% TODO: if/when we fix the persistent session count, this
                            %% should be 4.
                            <<"count">> := 6,
                            <<"hasnext">> := true
                        }
                    }}},
                list_request(APIPort, "limit=2&page=1")
            ),
            %% Disconnect persistent sessions
            lists:foreach(fun emqtt:stop/1, [C1, C2]),

            P3 =
                ?retry(200, 10, begin
                    P3_ = list_request(APIPort, "limit=3&page=1"),
                    ?assertMatch(
                        {ok,
                            {{_, 200, _}, _, #{
                                <<"data">> := [_, _, _],
                                <<"meta">> := #{
                                    <<"count">> := 4,
                                    <<"hasnext">> := true
                                }
                            }}},
                        P3_
                    ),
                    P3_
                end),
            P4 =
                ?retry(200, 10, begin
                    P4_ = list_request(APIPort, "limit=3&page=2"),
                    ?assertMatch(
                        {ok,
                            {{_, 200, _}, _, #{
                                <<"data">> := [_],
                                <<"meta">> := #{
                                    <<"count">> := 4,
                                    <<"hasnext">> := false
                                }
                            }}},
                        P4_
                    ),
                    P4_
                end),
            {ok, {_, _, #{<<"data">> := R3}}} = P3,
            {ok, {_, _, #{<<"data">> := R4}}} = P4,
            ?assertEqual(
                lists:sort([ClientId1, ClientId2, ClientId3, ClientId4]),
                lists:sort(lists:map(fun(#{<<"clientid">> := CId}) -> CId end, R3 ++ R4))
            ),

            lists:foreach(fun emqtt:stop/1, [C3, C4]),
            lists:foreach(
                fun(ClientId) ->
                    ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId])
                end,
                [ClientId1, ClientId2, ClientId3, ClientId4]
            ),

            ok
        end,
        []
    ),
    ok.

t_clients_bad_value_type(_) ->
    %% get /clients
    AuthHeader = [emqx_common_test_http:default_auth_header()],
    ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]),
    QsString = cow_qs:qs([{<<"ip_address">>, <<"127.0.0.1:8080">>}]),
    {ok, 400, Resp} = emqx_mgmt_api_test_util:request_api(
        get, ClientsPath, QsString, AuthHeader, [], #{compatible_mode => true}
    ),
    ?assertMatch(
        #{
            <<"code">> := <<"INVALID_PARAMETER">>,
            <<"message">> :=
                <<"the ip_address parameter expected type is ip, but the value is 127.0.0.1:8080">>
        },
        emqx_utils_json:decode(Resp, [return_maps])
    ).

t_authz_cache(_) ->
    ClientId = <<"client_authz">>,

    {ok, C} = emqtt:start_link(#{clientid => ClientId}),
    {ok, _} = emqtt:connect(C),
    {ok, _, _} = emqtt:subscribe(C, <<"topic/1">>, 1),

    ClientAuthzCachePath = emqx_mgmt_api_test_util:api_path([
        "clients",
        binary_to_list(ClientId),
        "authorization",
        "cache"
    ]),
    {ok, ClientAuthzCache} = emqx_mgmt_api_test_util:request_api(get, ClientAuthzCachePath),
    ?assertMatch(
        [
            #{
                <<"access">> :=
                    #{<<"action_type">> := <<"subscribe">>, <<"qos">> := 1},
                <<"result">> := <<"allow">>,
                <<"topic">> := <<"topic/1">>,
                <<"updated_time">> := _
            }
        ],
        emqx_utils_json:decode(ClientAuthzCache, [return_maps])
    ),

    ok = emqtt:stop(C).

t_kickout_clients(_) ->
    process_flag(trap_exit, true),

    ClientId1 = <<"client1">>,
    ClientId2 = <<"client2">>,
    ClientId3 = <<"client3">>,

    {ok, C1} = emqtt:start_link(#{
        clientid => ClientId1,
        proto_ver => v5,
        properties => #{'Session-Expiry-Interval' => 120}
    }),
    {ok, _} = emqtt:connect(C1),
    {ok, C2} = emqtt:start_link(#{clientid => ClientId2}),
    {ok, _} = emqtt:connect(C2),
    {ok, C3} = emqtt:start_link(#{clientid => ClientId3}),
    {ok, _} = emqtt:connect(C3),

    emqx_common_test_helpers:wait_for(
        ?FUNCTION_NAME,
        ?LINE,
        fun() ->
            try
                [_] = emqx_cm:lookup_channels(ClientId1),
                [_] = emqx_cm:lookup_channels(ClientId2),
                [_] = emqx_cm:lookup_channels(ClientId3),
                true
            catch
                error:badmatch ->
                    false
            end
        end,
        2000
    ),

    %% get /clients
    ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]),
    {ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
    ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]),
    ClientsMeta = maps:get(<<"meta">>, ClientsResponse),
    ClientsPage = maps:get(<<"page">>, ClientsMeta),
    ClientsLimit = maps:get(<<"limit">>, ClientsMeta),
    ClientsCount = maps:get(<<"count">>, ClientsMeta),
    ?assertEqual(ClientsPage, 1),
    ?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()),
    ?assertEqual(ClientsCount, 3),

    %% kickout clients
    KickoutPath = emqx_mgmt_api_test_util:api_path(["clients", "kickout", "bulk"]),
    KickoutBody = [ClientId1, ClientId2, ClientId3],
    {ok, 204, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody),

    ReceiveExit = fun({ClientPid, ClientId}) ->
        receive
            {'EXIT', Pid, _} when Pid =:= ClientPid ->
                ok
        after 1000 ->
            error({timeout, ClientId})
        end
    end,
    lists:foreach(ReceiveExit, [{C1, ClientId1}, {C2, ClientId2}, {C3, ClientId3}]),
    {ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
    ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]),
    ?assertMatch(#{<<"meta">> := #{<<"count">> := 0}}, ClientsResponse2).

t_query_clients_with_time(_) ->
    process_flag(trap_exit, true),

    Username1 = <<"user1">>,
    ClientId1 = <<"client1">>,

    Username2 = <<"user2">>,
    ClientId2 = <<"client2">>,

    {ok, C1} = emqtt:start_link(#{username => Username1, clientid => ClientId1}),
    {ok, _} = emqtt:connect(C1),
    {ok, C2} = emqtt:start_link(#{username => Username2, clientid => ClientId2}),
    {ok, _} = emqtt:connect(C2),

    timer:sleep(100),

    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
    ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]),
    %% get /clients with time(rfc3339)
    NowTimeStampInt = erlang:system_time(millisecond),
    %% Do not uri_encode `=` to `%3D`
    Rfc3339String = emqx_http_lib:uri_encode(
        binary:bin_to_list(
            emqx_utils_calendar:epoch_to_rfc3339(NowTimeStampInt)
        )
    ),
    TimeStampString = emqx_http_lib:uri_encode(integer_to_list(NowTimeStampInt)),

    LteKeys = ["lte_created_at=", "lte_connected_at="],
    GteKeys = ["gte_created_at=", "gte_connected_at="],
    LteParamRfc3339 = [Param ++ Rfc3339String || Param <- LteKeys],
    LteParamStamp = [Param ++ TimeStampString || Param <- LteKeys],
    GteParamRfc3339 = [Param ++ Rfc3339String || Param <- GteKeys],
    GteParamStamp = [Param ++ TimeStampString || Param <- GteKeys],

    RequestResults =
        [
            emqx_mgmt_api_test_util:request_api(get, ClientsPath, Param, AuthHeader)
         || Param <-
                LteParamRfc3339 ++ LteParamStamp ++
                    GteParamRfc3339 ++ GteParamStamp
        ],
    DecodedResults = [
        emqx_utils_json:decode(Response, [return_maps])
     || {ok, Response} <- RequestResults
    ],
    {LteResponseDecodeds, GteResponseDecodeds} = lists:split(4, DecodedResults),
    %% EachData :: list()
    [
        ?assert(time_string_to_epoch_millisecond(CreatedAt) < NowTimeStampInt)
     || #{<<"data">> := EachData} <- LteResponseDecodeds,
        #{<<"created_at">> := CreatedAt} <- EachData
    ],
    [
        ?assert(time_string_to_epoch_millisecond(ConnectedAt) < NowTimeStampInt)
     || #{<<"data">> := EachData} <- LteResponseDecodeds,
        #{<<"connected_at">> := ConnectedAt} <- EachData
    ],
    [
        ?assertEqual(EachData, [])
     || #{<<"data">> := EachData} <- GteResponseDecodeds
    ],

    %% testcase cleanup, kickout client1 and client2
    Client1Path = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1)]),
    Client2Path = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId2)]),
    {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client1Path),
    {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client2Path).

t_query_multiple_clients(_) ->
    process_flag(trap_exit, true),
    ClientIdsUsers = [
        {<<"multi_client1">>, <<"multi_user1">>},
        {<<"multi_client1-1">>, <<"multi_user1">>},
        {<<"multi_client2">>, <<"multi_user2">>},
        {<<"multi_client2-1">>, <<"multi_user2">>},
        {<<"multi_client3">>, <<"multi_user3">>},
        {<<"multi_client3-1">>, <<"multi_user3">>},
        {<<"multi_client4">>, <<"multi_user4">>},
        {<<"multi_client4-1">>, <<"multi_user4">>}
    ],
    _Clients = lists:map(
        fun({ClientId, Username}) ->
            {ok, C} = emqtt:start_link(#{clientid => ClientId, username => Username}),
            {ok, _} = emqtt:connect(C),
            C
        end,
        ClientIdsUsers
    ),
    timer:sleep(100),

    Auth = emqx_mgmt_api_test_util:auth_header_(),

    %% Not found clients/users
    ?assertEqual([], get_clients(Auth, "clientid=no_such_client")),
    ?assertEqual([], get_clients(Auth, "clientid=no_such_client&clientid=no_such_client1")),
    %% Duplicates must cause no issues
    ?assertEqual([], get_clients(Auth, "clientid=no_such_client&clientid=no_such_client")),
    ?assertEqual([], get_clients(Auth, "username=no_such_user&clientid=no_such_user1")),
    ?assertEqual([], get_clients(Auth, "username=no_such_user&clientid=no_such_user")),
    ?assertEqual(
        [],
        get_clients(
            Auth,
            "clientid=no_such_client&clientid=no_such_client"
            "username=no_such_user&clientid=no_such_user1"
        )
    ),

    %% Requested ClientId / username values relate to different clients
    ?assertEqual([], get_clients(Auth, "clientid=multi_client1&username=multi_user2")),
    ?assertEqual(
        [],
        get_clients(
            Auth,
            "clientid=multi_client1&clientid=multi_client1-1"
            "&username=multi_user2&username=multi_user3"
        )
    ),
    ?assertEqual([<<"multi_client1">>], get_clients(Auth, "clientid=multi_client1")),
    %% Duplicates must cause no issues
    ?assertEqual(
        [<<"multi_client1">>], get_clients(Auth, "clientid=multi_client1&clientid=multi_client1")
    ),
    ?assertEqual(
        [<<"multi_client1">>], get_clients(Auth, "clientid=multi_client1&username=multi_user1")
    ),
    ?assertEqual(
        lists:sort([<<"multi_client1">>, <<"multi_client1-1">>]),
        lists:sort(get_clients(Auth, "username=multi_user1"))
    ),
    ?assertEqual(
        lists:sort([<<"multi_client1">>, <<"multi_client1-1">>]),
        lists:sort(get_clients(Auth, "clientid=multi_client1&clientid=multi_client1-1"))
    ),
    ?assertEqual(
        lists:sort([<<"multi_client1">>, <<"multi_client1-1">>]),
        lists:sort(
            get_clients(
                Auth,
                "clientid=multi_client1&clientid=multi_client1-1"
                "&username=multi_user1"
            )
        )
    ),
    ?assertEqual(
        lists:sort([<<"multi_client1">>, <<"multi_client1-1">>]),
        lists:sort(
            get_clients(
                Auth,
                "clientid=no-such-client&clientid=multi_client1&clientid=multi_client1-1"
                "&username=multi_user1"
            )
        )
    ),
    ?assertEqual(
        lists:sort([<<"multi_client1">>, <<"multi_client1-1">>]),
        lists:sort(
            get_clients(
                Auth,
                "clientid=no-such-client&clientid=multi_client1&clientid=multi_client1-1"
                "&username=multi_user1&username=no-such-user"
            )
        )
    ),

    AllQsFun = fun(QsKey, Pos) ->
        QsParts = [
            QsKey ++ "=" ++ binary_to_list(element(Pos, ClientUser))
         || ClientUser <- ClientIdsUsers
        ],
        lists:flatten(lists:join("&", QsParts))
    end,
    AllClientsQs = AllQsFun("clientid", 1),
    AllUsersQs = AllQsFun("username", 2),
    AllClientIds = lists:sort([C || {C, _U} <- ClientIdsUsers]),

    ?assertEqual(AllClientIds, lists:sort(get_clients(Auth, AllClientsQs))),
    ?assertEqual(AllClientIds, lists:sort(get_clients(Auth, AllUsersQs))),
    ?assertEqual(AllClientIds, lists:sort(get_clients(Auth, AllClientsQs ++ "&" ++ AllUsersQs))),

    %% Test with other filter params
    NodeQs = "&node=" ++ atom_to_list(node()),
    NoNodeQs = "&node=nonode@nohost",
    ?assertEqual(
        AllClientIds, lists:sort(get_clients(Auth, AllClientsQs ++ "&" ++ AllUsersQs ++ NodeQs))
    ),
    ?assertMatch(
        {error, _}, get_clients_expect_error(Auth, AllClientsQs ++ "&" ++ AllUsersQs ++ NoNodeQs)
    ),

    %% fuzzy search (like_{key}) must be ignored if accurate filter ({key}) is present
    ?assertEqual(
        AllClientIds,
        lists:sort(get_clients(Auth, AllClientsQs ++ "&" ++ AllUsersQs ++ "&like_clientid=multi"))
    ),
    ?assertEqual(
        AllClientIds,
        lists:sort(get_clients(Auth, AllClientsQs ++ "&" ++ AllUsersQs ++ "&like_username=multi"))
    ),
    ?assertEqual(
        AllClientIds,
        lists:sort(
            get_clients(Auth, AllClientsQs ++ "&" ++ AllUsersQs ++ "&like_clientid=does-not-matter")
        )
    ),
    ?assertEqual(
        AllClientIds,
        lists:sort(
            get_clients(Auth, AllClientsQs ++ "&" ++ AllUsersQs ++ "&like_username=does-not-matter")
        )
    ),

    %% Combining multiple clientids with like_username and vice versa must narrow down search results
    ?assertEqual(
        lists:sort([<<"multi_client1">>, <<"multi_client1-1">>]),
        lists:sort(get_clients(Auth, AllClientsQs ++ "&like_username=user1"))
    ),
    ?assertEqual(
        lists:sort([<<"multi_client1">>, <<"multi_client1-1">>]),
        lists:sort(get_clients(Auth, AllUsersQs ++ "&like_clientid=client1"))
    ),
    ?assertEqual([], get_clients(Auth, AllClientsQs ++ "&like_username=nouser")),
    ?assertEqual([], get_clients(Auth, AllUsersQs ++ "&like_clientid=nouser")).

t_query_multiple_clients_urlencode(_) ->
    process_flag(trap_exit, true),
    ClientIdsUsers = [
        {<<"multi_client=a?">>, <<"multi_user=a?">>},
        {<<"mutli_client=b?">>, <<"multi_user=b?">>}
    ],
    _Clients = lists:map(
        fun({ClientId, Username}) ->
            {ok, C} = emqtt:start_link(#{clientid => ClientId, username => Username}),
            {ok, _} = emqtt:connect(C),
            C
        end,
        ClientIdsUsers
    ),
    timer:sleep(100),

    Auth = emqx_mgmt_api_test_util:auth_header_(),
    ClientsQs = uri_string:compose_query([{<<"clientid">>, C} || {C, _} <- ClientIdsUsers]),
    UsersQs = uri_string:compose_query([{<<"username">>, U} || {_, U} <- ClientIdsUsers]),
    ExpectedClients = lists:sort([C || {C, _} <- ClientIdsUsers]),
    ?assertEqual(ExpectedClients, lists:sort(get_clients(Auth, ClientsQs))),
    ?assertEqual(ExpectedClients, lists:sort(get_clients(Auth, UsersQs))).

t_query_clients_with_fields(_) ->
    process_flag(trap_exit, true),
    TCBin = atom_to_binary(?FUNCTION_NAME),
    ClientId = <<TCBin/binary, "_client">>,
    Username = <<TCBin/binary, "_user">>,
    {ok, C} = emqtt:start_link(#{clientid => ClientId, username => Username}),
    {ok, _} = emqtt:connect(C),
    timer:sleep(100),

    Auth = emqx_mgmt_api_test_util:auth_header_(),
    ?assertEqual([#{<<"clientid">> => ClientId}], get_clients_all_fields(Auth, "fields=clientid")),
    ?assertEqual(
        [#{<<"clientid">> => ClientId, <<"username">> => Username}],
        get_clients_all_fields(Auth, "fields=clientid,username")
    ),

    AllFields = get_clients_all_fields(Auth, "fields=all"),
    DefaultFields = get_clients_all_fields(Auth, ""),

    ?assertEqual(AllFields, DefaultFields),
    ?assertMatch(
        [#{<<"clientid">> := ClientId, <<"username">> := Username}],
        AllFields
    ),
    ?assert(map_size(hd(AllFields)) > 2),
    ?assertMatch({error, _}, get_clients_expect_error(Auth, "fields=bad_field_name")),
    ?assertMatch({error, _}, get_clients_expect_error(Auth, "fields=all,bad_field_name")),
    ?assertMatch({error, _}, get_clients_expect_error(Auth, "fields=all,username,clientid")).

get_clients_all_fields(Auth, Qs) ->
    get_clients(Auth, Qs, false, false).

get_clients_expect_error(Auth, Qs) ->
    get_clients(Auth, Qs, true, true).

get_clients(Auth, Qs) ->
    get_clients(Auth, Qs, false, true).

get_clients(Auth, Qs, ExpectError, ClientIdOnly) ->
    ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]),
    Resp = emqx_mgmt_api_test_util:request_api(get, ClientsPath, Qs, Auth),
    case ExpectError of
        false ->
            {ok, Body} = Resp,
            #{<<"data">> := Clients} = emqx_utils_json:decode(Body),
            case ClientIdOnly of
                true -> [ClientId || #{<<"clientid">> := ClientId} <- Clients];
                false -> Clients
            end;
        true ->
            Resp
    end.

t_keepalive(_Config) ->
    Username = "user_keepalive",
    ClientId = "client_keepalive",
    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
    Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "keepalive"]),
    Body = #{interval => 11},
    {error, {"HTTP/1.1", 404, "Not Found"}} =
        emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body),
    %% 65535 is the max value of keepalive
    MaxKeepalive = 65535,
    InitKeepalive = round(MaxKeepalive / 1.5 + 1),
    {ok, C1} = emqtt:start_link(#{
        username => Username, clientid => ClientId, keepalive => InitKeepalive
    }),
    {ok, _} = emqtt:connect(C1),
    [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)),
    %% will reset to max keepalive if keepalive > max keepalive
    #{conninfo := #{keepalive := InitKeepalive}} = emqx_connection:info(Pid),
    ?assertMatch({keepalive, 65535000, _}, element(5, element(9, sys:get_state(Pid)))),

    {ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body),
    #{<<"keepalive">> := 11} = emqx_utils_json:decode(NewClient, [return_maps]),
    #{conninfo := #{keepalive := Keepalive}} = emqx_connection:info(Pid),
    ?assertEqual(11, Keepalive),
    %% Disable keepalive
    Body1 = #{interval => 0},
    {ok, NewClient1} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body1),
    #{<<"keepalive">> := 0} = emqx_utils_json:decode(NewClient1, [return_maps]),
    ?assertMatch(#{conninfo := #{keepalive := 0}}, emqx_connection:info(Pid)),
    %% Maximal keepalive
    Body2 = #{interval => 65536},
    {error, {"HTTP/1.1", 400, _}} =
        emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body2),
    emqtt:disconnect(C1),
    ok.

t_client_id_not_found(_Config) ->
    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
    Http = {"HTTP/1.1", 404, "Not Found"},
    Body = "{\"code\":\"CLIENTID_NOT_FOUND\",\"message\":\"Client ID not found\"}",

    PathFun = fun(Suffix) ->
        emqx_mgmt_api_test_util:api_path(["clients", "no_existed_clientid"] ++ Suffix)
    end,
    ReqFun = fun(Method, Path) ->
        emqx_mgmt_api_test_util:request_api(
            Method, Path, "", AuthHeader, [], #{return_all => true}
        )
    end,

    PostFun = fun(Method, Path, Data) ->
        emqx_mgmt_api_test_util:request_api(
            Method, Path, "", AuthHeader, Data, #{return_all => true}
        )
    end,

    %% Client lookup
    ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun([]))),
    %% Client kickout
    ?assertMatch({error, {Http, _, Body}}, ReqFun(delete, PathFun([]))),
    %% Client Subscription list
    ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["subscriptions"]))),
    %% AuthZ Cache lookup
    ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["authorization", "cache"]))),
    %% AuthZ Cache clean
    ?assertMatch({error, {Http, _, Body}}, ReqFun(delete, PathFun(["authorization", "cache"]))),
    %% Client Subscribe
    SubBody = #{topic => <<"testtopic">>, qos => 1, nl => 1, rh => 1},
    ?assertMatch({error, {Http, _, Body}}, PostFun(post, PathFun(["subscribe"]), SubBody)),
    ?assertMatch(
        {error, {Http, _, Body}}, PostFun(post, PathFun(["subscribe", "bulk"]), [SubBody])
    ),
    %% Client Unsubscribe
    UnsubBody = #{topic => <<"testtopic">>},
    ?assertMatch({error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe"]), UnsubBody)),
    ?assertMatch(
        {error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe", "bulk"]), [UnsubBody])
    ),
    %% Mqueue messages
    ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["mqueue_messages"]))),
    %% Inflight messages
    ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["inflight_messages"]))).

t_sessions_count(_Config) ->
    ClientId = atom_to_binary(?FUNCTION_NAME),
    Topic = <<"t/test_sessions_count">>,
    Conf0 = emqx_config:get([broker]),
    Conf1 = hocon_maps:deep_merge(Conf0, #{session_history_retain => 5}),
    %% from 1 seconds ago, which is for sure less than histry retain duration
    %% hence force a call to the gen_server emqx_cm_registry_keeper
    Since = erlang:system_time(seconds) - 1,
    ok = emqx_config:put(#{broker => Conf1}),
    {ok, Client} = emqtt:start_link([
        {proto_ver, v5},
        {clientid, ClientId},
        {clean_start, true}
    ]),
    {ok, _} = emqtt:connect(Client),
    {ok, _, _} = emqtt:subscribe(Client, Topic, 1),
    Path = emqx_mgmt_api_test_util:api_path(["sessions_count"]),
    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
    ?assertMatch(
        {ok, "1"},
        emqx_mgmt_api_test_util:request_api(
            get, Path, "since=" ++ integer_to_list(Since), AuthHeader
        )
    ),
    ok = emqtt:disconnect(Client),
    %% simulate the situation in which the process is not running
    ok = supervisor:terminate_child(emqx_cm_sup, emqx_cm_registry_keeper),
    ?assertMatch(
        {error, {_, 400, _}},
        emqx_mgmt_api_test_util:request_api(
            get, Path, "since=" ++ integer_to_list(Since), AuthHeader
        )
    ),
    %% restore default value
    ok = emqx_config:put(#{broker => Conf0}),
    ok = emqx_cm_registry_keeper:purge(),
    ok.

t_mqueue_messages(Config) ->
    ClientId = atom_to_binary(?FUNCTION_NAME),
    Topic = <<"t/test_mqueue_msgs">>,
    Count = emqx_mgmt:default_row_limit(),
    {ok, _Client} = client_with_mqueue(ClientId, Topic, Count),
    Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]),
    ?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])),
    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
    IsMqueue = true,
    test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config), IsMqueue),

    ?assertMatch(
        {error, {_, 400, _}},
        emqx_mgmt_api_test_util:request_api(
            get, Path, "limit=10&position=not-valid", AuthHeader
        )
    ),
    ?assertMatch(
        {error, {_, 400, _}},
        emqx_mgmt_api_test_util:request_api(
            get, Path, "limit=-5&position=not-valid", AuthHeader
        )
    ).

t_inflight_messages(Config) ->
    ClientId = atom_to_binary(?FUNCTION_NAME),
    Topic = <<"t/test_inflight_msgs">>,
    PubCount = emqx_mgmt:default_row_limit(),
    {ok, Client} = client_with_inflight(ClientId, Topic, PubCount),
    Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "inflight_messages"]),
    InflightLimit = emqx:get_config([mqtt, max_inflight]),
    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
    IsMqueue = false,
    test_messages(
        Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config), IsMqueue
    ),

    ?assertMatch(
        {error, {_, 400, _}},
        emqx_mgmt_api_test_util:request_api(
            get, Path, "limit=10&position=not-int", AuthHeader
        )
    ),
    ?assertMatch(
        {error, {_, 400, _}},
        emqx_mgmt_api_test_util:request_api(
            get, Path, "limit=-5&position=invalid-int", AuthHeader
        )
    ),
    emqtt:stop(Client).

client_with_mqueue(ClientId, Topic, Count) ->
    {ok, Client} = emqtt:start_link([
        {proto_ver, v5},
        {clientid, ClientId},
        {clean_start, false},
        {properties, #{'Session-Expiry-Interval' => 120}}
    ]),
    {ok, _} = emqtt:connect(Client),
    {ok, _, _} = emqtt:subscribe(Client, Topic, 1),
    ok = emqtt:disconnect(Client),
    publish_msgs(Topic, Count),
    {ok, Client}.

client_with_inflight(ClientId, Topic, Count) ->
    {ok, Client} = emqtt:start_link([
        {proto_ver, v5},
        {clientid, ClientId},
        {clean_start, true},
        {auto_ack, never}
    ]),
    {ok, _} = emqtt:connect(Client),
    {ok, _, _} = emqtt:subscribe(Client, Topic, 1),
    publish_msgs(Topic, Count),
    {ok, Client}.

publish_msgs(Topic, Count) ->
    lists:foreach(
        fun(Seq) ->
            emqx_broker:publish(emqx_message:make(undefined, ?QOS_1, Topic, integer_to_binary(Seq)))
        end,
        lists:seq(1, Count)
    ).

test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding, IsMqueue) ->
    Qs0 = io_lib:format("payload=~s", [PayloadEncoding]),
    {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
    #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
    #{<<"start">> := StartPos, <<"position">> := Pos} = Meta,

    ?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)),
    ?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)),
    ?assertEqual(length(Msgs), Count),

    lists:foreach(
        fun({Seq, #{<<"payload">> := P} = M}) ->
            ?assertEqual(Seq, binary_to_integer(decode_payload(P, PayloadEncoding))),
            ?assertMatch(
                #{
                    <<"msgid">> := _,
                    <<"topic">> := Topic,
                    <<"qos">> := _,
                    <<"publish_at">> := _,
                    <<"from_clientid">> := _,
                    <<"from_username">> := _,
                    <<"inserted_at">> := _
                },
                M
            ),
            IsMqueue andalso ?assertMatch(#{<<"mqueue_priority">> := _}, M)
        end,
        lists:zip(lists:seq(1, Count), Msgs)
    ),

    %% The first message payload is <<"1">>,
    %% and when it is urlsafe base64 encoded (with no padding), it's <<"MQ">>,
    %% so we cover both cases:
    %% - when total payload size exceeds the limit,
    %% - when the first message payload already exceeds the limit but is still returned in the response.
    QsPayloadLimit = io_lib:format("payload=~s&max_payload_bytes=1", [PayloadEncoding]),
    {ok, LimitedMsgsResp} = emqx_mgmt_api_test_util:request_api(
        get, Path, QsPayloadLimit, AuthHeader
    ),
    #{<<"meta">> := _, <<"data">> := FirstMsgOnly} = emqx_utils_json:decode(LimitedMsgsResp),
    ?assertEqual(1, length(FirstMsgOnly)),
    ?assertEqual(
        <<"1">>, decode_payload(maps:get(<<"payload">>, hd(FirstMsgOnly)), PayloadEncoding)
    ),

    Limit = 19,
    LastPos = lists:foldl(
        fun(PageSeq, ThisPos) ->
            Qs = io_lib:format("payload=~s&position=~s&limit=~p", [PayloadEncoding, ThisPos, Limit]),
            {ok, MsgsRespPage} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader),
            #{
                <<"meta">> := #{<<"position">> := NextPos, <<"start">> := ThisStart},
                <<"data">> := MsgsPage
            } = emqx_utils_json:decode(MsgsRespPage),

            ?assertEqual(NextPos, msg_pos(lists:last(MsgsPage), IsMqueue)),
            %% Start position is the same in every response and points to the first msg
            ?assertEqual(StartPos, ThisStart),

            ?assertEqual(length(MsgsPage), Limit),
            ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1),
            ExpLastPayload = integer_to_binary(PageSeq * Limit),
            ?assertEqual(
                ExpFirstPayload,
                decode_payload(maps:get(<<"payload">>, hd(MsgsPage)), PayloadEncoding)
            ),
            ?assertEqual(
                ExpLastPayload,
                decode_payload(maps:get(<<"payload">>, lists:last(MsgsPage)), PayloadEncoding)
            ),
            NextPos
        end,
        none,
        lists:seq(1, Count div 19)
    ),
    LastPartialPage = Count div 19 + 1,
    LastQs = io_lib:format("payload=~s&position=~s&limit=~p", [PayloadEncoding, LastPos, Limit]),
    {ok, MsgsRespLastP} = emqx_mgmt_api_test_util:request_api(get, Path, LastQs, AuthHeader),
    #{<<"meta">> := #{<<"position">> := LastPartialPos}, <<"data">> := MsgsLastPage} = emqx_utils_json:decode(
        MsgsRespLastP
    ),
    %% The same as the position of all messages returned in one request
    ?assertEqual(Pos, LastPartialPos),

    ?assertEqual(
        integer_to_binary(LastPartialPage * Limit - Limit + 1),
        decode_payload(maps:get(<<"payload">>, hd(MsgsLastPage)), PayloadEncoding)
    ),
    ?assertEqual(
        integer_to_binary(Count),
        decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastPage)), PayloadEncoding)
    ),

    ExceedQs = io_lib:format("payload=~s&position=~s&limit=~p", [
        PayloadEncoding, LastPartialPos, Limit
    ]),
    {ok, MsgsEmptyResp} = emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader),
    ?assertMatch(
        #{
            <<"data">> := [],
            <<"meta">> := #{<<"position">> := LastPartialPos, <<"start">> := StartPos}
        },
        emqx_utils_json:decode(MsgsEmptyResp)
    ),

    %% Invalid common page params
    ?assertMatch(
        {error, {_, 400, _}},
        emqx_mgmt_api_test_util:request_api(get, Path, "limit=0", AuthHeader)
    ),
    ?assertMatch(
        {error, {_, 400, _}},
        emqx_mgmt_api_test_util:request_api(get, Path, "limit=limit", AuthHeader)
    ),

    %% Invalid max_paylod_bytes param
    ?assertMatch(
        {error, {_, 400, _}},
        emqx_mgmt_api_test_util:request_api(get, Path, "max_payload_bytes=0", AuthHeader)
    ),
    ?assertMatch(
        {error, {_, 400, _}},
        emqx_mgmt_api_test_util:request_api(get, Path, "max_payload_bytes=-1", AuthHeader)
    ),
    ?assertMatch(
        {error, {_, 400, _}},
        emqx_mgmt_api_test_util:request_api(get, Path, "max_payload_bytes=-1MB", AuthHeader)
    ),
    ?assertMatch(
        {error, {_, 400, _}},
        emqx_mgmt_api_test_util:request_api(get, Path, "max_payload_bytes=0MB", AuthHeader)
    ).

msg_pos(#{<<"inserted_at">> := TsBin, <<"mqueue_priority">> := Prio} = _Msg, true = _IsMqueue) ->
    <<TsBin/binary, "_", (emqx_utils_conv:bin(Prio))/binary>>;
msg_pos(#{<<"inserted_at">> := TsBin} = _Msg, _IsMqueue) ->
    TsBin.

decode_payload(Payload, base64) ->
    base64:decode(Payload);
decode_payload(Payload, _) ->
    Payload.

t_subscribe_shared_topic(_Config) ->
    ClientId = <<"client_subscribe_shared">>,

    {ok, C} = emqtt:start_link(#{clientid => ClientId}),
    {ok, _} = emqtt:connect(C),

    ClientPuber = <<"publish_client">>,
    {ok, PC} = emqtt:start_link(#{clientid => ClientPuber}),
    {ok, _} = emqtt:connect(PC),

    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),

    Http200 = {"HTTP/1.1", 200, "OK"},
    Http204 = {"HTTP/1.1", 204, "No Content"},

    PathFun = fun(Suffix) ->
        emqx_mgmt_api_test_util:api_path(["clients", "client_subscribe_shared"] ++ Suffix)
    end,

    PostFun = fun(Method, Path, Data) ->
        emqx_mgmt_api_test_util:request_api(
            Method, Path, "", AuthHeader, Data, #{return_all => true}
        )
    end,

    SharedT = <<"$share/group/testtopic">>,
    NonSharedT = <<"t/#">>,

    SubBodyFun = fun(T) -> #{topic => T, qos => 1, nl => 0, rh => 1} end,
    UnSubBodyFun = fun(T) -> #{topic => T} end,

    %% ====================
    %% Client Subscribe
    ?assertMatch(
        {ok, {Http200, _, _}},
        PostFun(post, PathFun(["subscribe"]), SubBodyFun(SharedT))
    ),
    ?assertMatch(
        {ok, {Http200, _, _}},
        PostFun(
            post,
            PathFun(["subscribe", "bulk"]),
            [SubBodyFun(T) || T <- [SharedT, NonSharedT]]
        )
    ),

    %% assert subscription
    ?assertMatch(
        [
            {_, #share{group = <<"group">>, topic = <<"testtopic">>}},
            {_, <<"t/#">>}
        ],
        ets:tab2list(?SUBSCRIPTION)
    ),

    ?assertMatch(
        [
            {{#share{group = <<"group">>, topic = <<"testtopic">>}, _}, #{
                nl := 0, qos := 1, rh := 1, rap := 0
            }},
            {{<<"t/#">>, _}, #{nl := 0, qos := 1, rh := 1, rap := 0}}
        ],
        ets:tab2list(?SUBOPTION)
    ),
    ?assertMatch(
        [{emqx_shared_subscription, <<"group">>, <<"testtopic">>, _}],
        ets:tab2list(emqx_shared_subscription)
    ),

    %% assert subscription virtual
    _ = emqtt:publish(PC, <<"testtopic">>, <<"msg1">>, [{qos, 0}]),
    ?assertReceive({publish, #{topic := <<"testtopic">>, payload := <<"msg1">>}}),
    _ = emqtt:publish(PC, <<"t/1">>, <<"msg2">>, [{qos, 0}]),
    ?assertReceive({publish, #{topic := <<"t/1">>, payload := <<"msg2">>}}),

    %% ====================
    %% Client Unsubscribe
    ?assertMatch(
        {ok, {Http204, _, _}},
        PostFun(post, PathFun(["unsubscribe"]), UnSubBodyFun(SharedT))
    ),
    ?assertMatch(
        {ok, {Http204, _, _}},
        PostFun(
            post,
            PathFun(["unsubscribe", "bulk"]),
            [UnSubBodyFun(T) || T <- [SharedT, NonSharedT]]
        )
    ),

    %% assert subscription
    ?assertEqual([], ets:tab2list(?SUBSCRIPTION)),
    ?assertEqual([], ets:tab2list(?SUBOPTION)),
    ?assertEqual([], ets:tab2list(emqx_shared_subscription)),

    %% assert subscription virtual
    _ = emqtt:publish(PC, <<"testtopic">>, <<"msg3">>, [{qos, 0}]),
    _ = emqtt:publish(PC, <<"t/1">>, <<"msg4">>, [{qos, 0}]),
    ?assertNotReceive({publish, #{topic := <<"testtopic">>, payload := <<"msg3">>}}),
    ?assertNotReceive({publish, #{topic := <<"t/1">>, payload := <<"msg4">>}}).

t_subscribe_shared_topic_nl(_Config) ->
    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
    Http400 = {"HTTP/1.1", 400, "Bad Request"},
    Body =
        "{\"code\":\"INVALID_PARAMETER\","
        "\"message\":\"Invalid Subscribe options: `no_local` not allowed for shared-sub. See [MQTT-3.8.3-4]\"}",
    ClientId = <<"client_subscribe_shared">>,

    {ok, C} = emqtt:start_link(#{clientid => ClientId}),
    {ok, _} = emqtt:connect(C),

    PathFun = fun(Suffix) ->
        emqx_mgmt_api_test_util:api_path(["clients", "client_subscribe_shared"] ++ Suffix)
    end,
    PostFun = fun(Method, Path, Data) ->
        emqx_mgmt_api_test_util:request_api(
            Method, Path, "", AuthHeader, Data, #{return_all => true}
        )
    end,
    T = <<"$share/group/testtopic">>,
    ?assertMatch(
        {error, {Http400, _, Body}},
        PostFun(post, PathFun(["subscribe"]), #{topic => T, qos => 1, nl => 1, rh => 1})
    ).

t_list_clients_v2(Config) ->
    [N1, N2] = ?config(nodes, Config),
    APIPort = 18084,
    Port1 = get_mqtt_port(N1, tcp),
    Port2 = get_mqtt_port(N2, tcp),

    ?check_trace(
        begin
            ClientId1 = <<"ca1">>,
            ClientId2 = <<"c2">>,
            ClientId3 = <<"c3">>,
            ClientId4 = <<"ca4">>,
            ClientId5 = <<"ca5">>,
            ClientId6 = <<"c6">>,
            AllClientIds = [
                ClientId1,
                ClientId2,
                ClientId3,
                ClientId4,
                ClientId5,
                ClientId6
            ],
            C1 = connect_client(#{port => Port1, clientid => ClientId1, clean_start => true}),
            C2 = connect_client(#{port => Port2, clientid => ClientId2, clean_start => true}),
            C3 = connect_client(#{port => Port1, clientid => ClientId3, clean_start => true}),
            C4 = connect_client(#{port => Port2, clientid => ClientId4, clean_start => true}),
            %% in-memory clients
            C5 = connect_client(#{
                port => Port1, clientid => ClientId5, expiry => 0, clean_start => true
            }),
            C6 = connect_client(#{
                port => Port2, clientid => ClientId6, expiry => 0, clean_start => true
            }),
            %% offline persistent clients
            ok = emqtt:stop(C3),
            ok = emqtt:stop(C4),

            %% one by one
            QueryParams1 = #{limit => "1"},
            Res1 = list_all_v2(APIPort, QueryParams1),
            ?assertMatch(
                [
                    #{
                        <<"data">> := [_],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := true,
                                <<"count">> := 1,
                                <<"cursor">> := _
                            }
                    },
                    #{
                        <<"data">> := [_],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := true,
                                <<"count">> := 1,
                                <<"cursor">> := _
                            }
                    },
                    #{
                        <<"data">> := [_],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := true,
                                <<"count">> := 1,
                                <<"cursor">> := _
                            }
                    },
                    #{
                        <<"data">> := [_],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := true,
                                <<"count">> := 1,
                                <<"cursor">> := _
                            }
                    },
                    #{
                        <<"data">> := [_],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := true,
                                <<"count">> := 1,
                                <<"cursor">> := _
                            }
                    },
                    #{
                        <<"data">> := [_],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := false,
                                <<"count">> := 1
                            }
                    }
                ],
                Res1
            ),
            assert_contains_clientids(Res1, AllClientIds),

            %% Reusing the same cursors yield the same pages
            traverse_in_reverse_v2(APIPort, QueryParams1, Res1),

            %% paging
            QueryParams2 = #{limit => "4"},
            Res2 = list_all_v2(APIPort, QueryParams2),
            ?assertMatch(
                [
                    #{
                        <<"data">> := [_, _, _, _],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := true,
                                <<"count">> := 4,
                                <<"cursor">> := _
                            }
                    },
                    #{
                        <<"data">> := [_, _],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := false,
                                <<"count">> := 2
                            }
                    }
                ],
                Res2
            ),
            assert_contains_clientids(Res2, AllClientIds),
            traverse_in_reverse_v2(APIPort, QueryParams2, Res2),

            QueryParams3 = #{limit => "2"},
            Res3 = list_all_v2(APIPort, QueryParams3),
            ?assertMatch(
                [
                    #{
                        <<"data">> := [_, _],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := true,
                                <<"count">> := 2,
                                <<"cursor">> := _
                            }
                    },
                    #{
                        <<"data">> := [_, _],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := true,
                                <<"count">> := 2,
                                <<"cursor">> := _
                            }
                    },
                    #{
                        <<"data">> := [_, _],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := false,
                                <<"count">> := 2
                            }
                    }
                ],
                Res3
            ),
            assert_contains_clientids(Res3, AllClientIds),
            traverse_in_reverse_v2(APIPort, QueryParams3, Res3),

            %% fuzzy filters
            QueryParams4 = #{limit => "100", like_clientid => "ca"},
            Res4 = list_all_v2(APIPort, QueryParams4),
            ?assertMatch(
                [
                    #{
                        <<"data">> := [_, _, _],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := false,
                                <<"count">> := 3
                            }
                    }
                ],
                Res4
            ),
            assert_contains_clientids(Res4, [ClientId1, ClientId4, ClientId5]),
            traverse_in_reverse_v2(APIPort, QueryParams4, Res4),
            QueryParams5 = #{limit => "1", like_clientid => "ca"},
            Res5 = list_all_v2(APIPort, QueryParams5),
            ?assertMatch(
                [
                    #{
                        <<"data">> := [_],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := true,
                                <<"count">> := 1,
                                <<"cursor">> := _
                            }
                    },
                    #{
                        <<"data">> := [_],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := true,
                                <<"count">> := 1,
                                <<"cursor">> := _
                            }
                    },
                    #{
                        <<"data">> := [_],
                        <<"meta">> :=
                            #{
                                <<"hasnext">> := false,
                                <<"count">> := 1
                            }
                    }
                ],
                Res5
            ),
            assert_contains_clientids(Res5, [ClientId1, ClientId4, ClientId5]),
            traverse_in_reverse_v2(APIPort, QueryParams5, Res5),

            lists:foreach(
                fun(C) ->
                    {_, {ok, _}} =
                        ?wait_async_action(
                            emqtt:stop(C),
                            #{?snk_kind := emqx_cm_clean_down}
                        )
                end,
                [C1, C2, C5, C6]
            ),

            %% Verify that a malicious cursor that could generate an atom on the node is
            %% rejected
            EvilAtomBin0 = <<131, 100, 0, 5, "some_atom_that_doesnt_exist_on_the_remote_node">>,
            EvilAtomBin = emqx_base62:encode(EvilAtomBin0),

            ?assertMatch(
                {error, {{_, 400, _}, _, #{<<"message">> := <<"bad cursor">>}}},
                list_v2_request(APIPort, #{limit => "1", cursor => EvilAtomBin})
            ),
            %% Verify that the atom was not created
            erpc:call(N1, fun() ->
                ?assertError(badarg, binary_to_term(EvilAtomBin0, [safe]))
            end),
            ?assert(is_atom(binary_to_term(EvilAtomBin0))),

            lists:foreach(
                fun(ClientId) ->
                    ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId])
                end,
                AllClientIds
            ),

            ok
        end,
        []
    ),
    ok.

t_cursor_serde_prop(_Config) ->
    ?assert(proper:quickcheck(cursor_serde_prop(), [{numtests, 100}, {to_file, user}])).

cursor_serde_prop() ->
    ?FORALL(
        NumNodes,
        range(1, 10),
        ?FORALL(
            Cursor,
            list_clients_cursor_gen(NumNodes),
            begin
                Nodes = lists:seq(1, NumNodes),
                Bin = emqx_mgmt_api_clients:serialize_cursor(Cursor),
                Res = emqx_mgmt_api_clients:parse_cursor(Bin, Nodes),
                ?WHENFAIL(
                    ct:pal("original:\n  ~p\nroundtrip:\n  ~p", [Cursor, Res]),
                    {ok, Cursor} =:= Res
                )
            end
        )
    ).

list_clients_cursor_gen(NumNodes) ->
    oneof([
        lists_clients_ets_cursor_gen(NumNodes),
        lists_clients_ds_cursor_gen()
    ]).

-define(CURSOR_TYPE_ETS, 1).
-define(CURSOR_TYPE_DS, 2).

lists_clients_ets_cursor_gen(NumNodes) ->
    ?LET(
        {NodeIdx, Cont},
        {range(1, NumNodes), oneof([undefined, tuple()])},
        #{
            type => ?CURSOR_TYPE_ETS,
            node => NodeIdx,
            node_idx => NodeIdx,
            cont => Cont
        }
    ).

lists_clients_ds_cursor_gen() ->
    ?LET(
        Iter,
        oneof(['$end_of_table', list(term())]),
        #{
            type => ?CURSOR_TYPE_DS,
            iterator => Iter
        }
    ).

time_string_to_epoch_millisecond(DateTime) ->
    time_string_to_epoch(DateTime, millisecond).

time_string_to_epoch(DateTime, Unit) when is_binary(DateTime) ->
    try binary_to_integer(DateTime) of
        TimeStamp when is_integer(TimeStamp) -> TimeStamp
    catch
        error:badarg ->
            calendar:rfc3339_to_system_time(
                binary_to_list(DateTime), [{unit, Unit}]
            )
    end.

get_mqtt_port(Node, Type) ->
    {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
    Port.

request(Method, Path, Params) ->
    request(Method, Path, Params, _QueryParams = "").

request(Method, Path, Params, QueryParams) ->
    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
    Opts = #{return_all => true},
    case emqx_mgmt_api_test_util:request_api(Method, Path, QueryParams, AuthHeader, Params, Opts) of
        {ok, {Status, Headers, Body0}} ->
            Body = maybe_json_decode(Body0),
            {ok, {Status, Headers, Body}};
        {error, {Status, Headers, Body0}} ->
            Body =
                case emqx_utils_json:safe_decode(Body0, [return_maps]) of
                    {ok, Decoded0 = #{<<"message">> := Msg0}} ->
                        Msg = maybe_json_decode(Msg0),
                        Decoded0#{<<"message">> := Msg};
                    {ok, Decoded0} ->
                        Decoded0;
                    {error, _} ->
                        Body0
                end,
            {error, {Status, Headers, Body}};
        Error ->
            Error
    end.

maybe_json_decode(X) ->
    case emqx_utils_json:safe_decode(X, [return_maps]) of
        {ok, Decoded} -> Decoded;
        {error, _} -> X
    end.

list_request(Port) ->
    list_request(Port, _QueryParams = "").

list_request(Port, QueryParams) ->
    Host = "http://127.0.0.1:" ++ integer_to_list(Port),
    Path = emqx_mgmt_api_test_util:api_path(Host, ["clients"]),
    request(get, Path, [], QueryParams).

list_v2_request(Port, QueryParams = #{}) ->
    Host = "http://127.0.0.1:" ++ integer_to_list(Port),
    Path = emqx_mgmt_api_test_util:api_path(Host, ["clients_v2"]),
    QS = uri_string:compose_query(maps:to_list(emqx_utils_maps:binary_key_map(QueryParams))),
    request(get, Path, [], QS).

list_all_v2(Port, QueryParams = #{}) ->
    do_list_all_v2(Port, QueryParams, _Acc = []).

do_list_all_v2(Port, QueryParams, Acc) ->
    case list_v2_request(Port, QueryParams) of
        {ok, {{_, 200, _}, _, Resp = #{<<"meta">> := #{<<"cursor">> := Cursor}}}} ->
            do_list_all_v2(Port, QueryParams#{cursor => Cursor}, [Resp | Acc]);
        {ok, {{_, 200, _}, _, Resp = #{<<"meta">> := #{<<"hasnext">> := false}}}} ->
            lists:reverse([Resp | Acc]);
        Other ->
            error(
                {unexpected_response, #{
                    acc_so_far => Acc,
                    response => Other,
                    query_params => QueryParams
                }}
            )
    end.

lookup_request(ClientId) ->
    lookup_request(ClientId, 18083).

lookup_request(ClientId, Port) ->
    Host = "http://127.0.0.1:" ++ integer_to_list(Port),
    Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId]),
    request(get, Path, []).

assert_single_client(Opts) ->
    #{
        api_port := APIPort,
        clientid := ClientId,
        node := Node,
        status := Connected
    } = Opts,
    IsConnected =
        case Connected of
            connected -> true;
            disconnected -> false
        end,
    ?retry(
        100,
        20,
        ?assertMatch(
            {ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"connected">> := IsConnected}]}}},
            list_request(APIPort)
        )
    ),
    ?retry(
        100,
        20,
        ?assertMatch(
            {ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"connected">> := IsConnected}]}}},
            list_request(APIPort, "node=" ++ atom_to_list(Node)),
            #{node => Node}
        )
    ),
    ?assertMatch(
        {ok, {{_, 200, _}, _, #{<<"connected">> := IsConnected}}},
        lookup_request(ClientId, APIPort)
    ),
    ok.

connect_client(Opts) ->
    Defaults = #{
        expiry => 30,
        clean_start => false
    },
    #{
        port := Port,
        clientid := ClientId,
        clean_start := CleanStart,
        expiry := EI
    } = maps:merge(Defaults, Opts),
    {ok, C} = emqtt:start_link([
        {port, Port},
        {proto_ver, v5},
        {clientid, ClientId},
        {clean_start, CleanStart},
        {properties, #{'Session-Expiry-Interval' => EI}}
    ]),
    {ok, _} = emqtt:connect(C),
    C.

assert_contains_clientids(Results, ExpectedClientIds) ->
    ContainedClientIds = [
        ClientId
     || #{<<"data">> := Rows} <- Results,
        #{<<"clientid">> := ClientId} <- Rows
    ],
    ?assertEqual(
        lists:sort(ExpectedClientIds),
        lists:sort(ContainedClientIds),
        #{results => Results}
    ).

traverse_in_reverse_v2(APIPort, QueryParams0, Results) ->
    Cursors0 =
        lists:map(
            fun(#{<<"meta">> := Meta}) ->
                maps:get(<<"cursor">>, Meta, <<"wontbeused">>)
            end,
            Results
        ),
    Cursors1 = [<<"none">> | lists:droplast(Cursors0)],
    DirectOrderClientIds = [
        ClientId
     || #{<<"data">> := Rows} <- Results,
        #{<<"clientid">> := ClientId} <- Rows
    ],
    ReverseCursors = lists:reverse(Cursors1),
    do_traverse_in_reverse_v2(
        APIPort, QueryParams0, ReverseCursors, DirectOrderClientIds, _Acc = []
    ).

do_traverse_in_reverse_v2(_APIPort, _QueryParams0, _Cursors = [], DirectOrderClientIds, Acc) ->
    ?assertEqual(DirectOrderClientIds, Acc);
do_traverse_in_reverse_v2(APIPort, QueryParams0, [Cursor | Rest], DirectOrderClientIds, Acc) ->
    QueryParams = QueryParams0#{cursor => Cursor},
    Res0 = list_v2_request(APIPort, QueryParams),
    ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := _}}}, Res0),
    {ok, {{_, 200, _}, _, #{<<"data">> := Rows}}} = Res0,
    ClientIds = [ClientId || #{<<"clientid">> := ClientId} <- Rows],
    do_traverse_in_reverse_v2(APIPort, QueryParams0, Rest, DirectOrderClientIds, ClientIds ++ Acc).
